# -*- coding: utf-8 -*-
"""Tests for various magic functions."""

import collections
import gc
import io
import json
import os
import platform
import re
import shlex
import signal
import sys
import warnings
from importlib import invalidate_caches
from io import StringIO
from pathlib import Path
from time import sleep
from threading import Thread
from subprocess import CalledProcessError
from textwrap import dedent
from unittest import TestCase, mock

import pytest

from IPython import get_ipython
from IPython.core import magic
from IPython.core.error import UsageError
from IPython.core.magic import (
    Magics,
    cell_magic,
    line_magic,
    magics_class,
    register_cell_magic,
    register_line_magic,
)
from IPython.core.magics import code, execution, logging, osm, script
from IPython.core.history import HistoryOutput
from IPython.testing import decorators as dec
from IPython.testing import tools as tt
from IPython.utils.io import capture_output
from IPython.utils.process import find_cmd
from IPython.utils.tempdir import TemporaryDirectory, TemporaryWorkingDirectory
from IPython.utils.syspathcontext import prepended_to_syspath

from .test_debugger import PdbTestInput

from tempfile import NamedTemporaryFile


@magic.magics_class
class DummyMagics(magic.Magics):
    pass


def test_extract_code_ranges():
    instr = "1 3 5-6 7-9 10:15 17: :10 10- -13 :"
    expected = [
        (0, 1),
        (2, 3),
        (4, 6),
        (6, 9),
        (9, 14),
        (16, None),
        (None, 9),
        (9, None),
        (None, 13),
        (None, None),
    ]
    actual = list(code.extract_code_ranges(instr))
    assert actual == expected


def test_extract_symbols():
    source = """import foo\na = 10\ndef b():\n    return 42\n\n\nclass A: pass\n\n\n"""
    symbols_args = ["a", "b", "A", "A,b", "A,a", "z"]
    expected = [
        ([], ["a"]),
        (["def b():\n    return 42\n"], []),
        (["class A: pass\n"], []),
        (["class A: pass\n", "def b():\n    return 42\n"], []),
        (["class A: pass\n"], ["a"]),
        ([], ["z"]),
    ]
    for symbols, exp in zip(symbols_args, expected):
        assert code.extract_symbols(source, symbols) == exp


def test_extract_symbols_raises_exception_with_non_python_code():
    source = "=begin A Ruby program :)=end\n" "def hello\n" "puts 'Hello world'\n" "end"
    with pytest.raises(SyntaxError):
        code.extract_symbols(source, "hello")


def test_magic_not_found():
    # magic not found raises UsageError
    with pytest.raises(UsageError):
        _ip.run_line_magic("doesntexist", "")

    # ensure result isn't success when a magic isn't found
    result = _ip.run_cell("%doesntexist")
    assert isinstance(result.error_in_exec, UsageError)


def test_cell_magic_not_found():
    # magic not found raises UsageError
    with pytest.raises(UsageError):
        _ip.run_cell_magic("doesntexist", "line", "cell")

    # ensure result isn't success when a magic isn't found
    result = _ip.run_cell("%%doesntexist")
    assert isinstance(result.error_in_exec, UsageError)


def test_magic_error_status():
    def fail(shell):
        1 / 0

    _ip.register_magic_function(fail)
    result = _ip.run_cell("%fail")
    assert isinstance(result.error_in_exec, ZeroDivisionError)


def test_config():
    """test that config magic does not raise
    can happen if Configurable init is moved too early into
    Magics.__init__ as then a Config object will be registered as a
    magic.
    """
    ## should not raise.
    _ip.run_line_magic("config", "")


def test_config_available_configs():
    """test that config magic prints available configs in unique and
    sorted order."""
    with capture_output() as captured:
        _ip.run_line_magic("config", "")

    stdout = captured.stdout
    config_classes = stdout.strip().split("\n")[1:]
    assert config_classes == sorted(set(config_classes))


def test_config_print_class():
    """test that config with a classname prints the class's options."""
    with capture_output() as captured:
        _ip.run_line_magic("config", "TerminalInteractiveShell")

    stdout = captured.stdout
    assert re.match(
        "TerminalInteractiveShell.* options", stdout.splitlines()[0]
    ), f"{stdout}\n\n1st line of stdout not like 'TerminalInteractiveShell.* options'"


def test_rehashx():
    # clear up everything
    _ip.alias_manager.clear_aliases()
    del _ip.db["syscmdlist"]

    _ip.run_line_magic("rehashx", "")
    # Practically ALL ipython development systems will have more than 10 aliases

    assert len(_ip.alias_manager.aliases) > 10
    for name, cmd in _ip.alias_manager.aliases:
        # we must strip dots from alias names
        assert "." not in name

    # rehashx must fill up syscmdlist
    scoms = _ip.db["syscmdlist"]
    assert len(scoms) > 10


def test_magic_parse_options():
    """Test that we don't mangle paths when parsing magic options."""
    ip = get_ipython()
    path = "c:\\x"
    m = DummyMagics(ip)
    opts = m.parse_options("-f %s" % path, "f:")[0]
    # argv splitting is os-dependent
    if os.name == "posix":
        expected = "c:x"
    else:
        expected = path
    assert opts["f"] == expected


def test_magic_parse_long_options():
    """Magic.parse_options can handle --foo=bar long options"""
    ip = get_ipython()
    m = DummyMagics(ip)
    opts, _ = m.parse_options("--foo --bar=bubble", "a", "foo", "bar=")
    assert "foo" in opts
    assert "bar" in opts
    assert opts["bar"] == "bubble"


def doctest_hist_f():
    """Test %hist -f with temporary filename.

    In [9]: import tempfile, os

    In [10]: fd, tfile = tempfile.mkstemp('.py','tmp-ipython-')
    In [11]: os.close(fd)

    In [12]: %history -nl -y -f $tfile 3

    In [14]: import os; os.unlink(tfile)
    """


def doctest_hist_op():
    """Test %hist -op

    In [1]: class b(float):
       ...:     pass
       ...:

    In [2]: class s(object):
       ...:     def __str__(self):
       ...:         return 's'
       ...:

    In [3]:

    In [4]: class r(b):
       ...:     def __repr__(self):
       ...:         return 'r'
       ...:

    In [5]: class sr(s,r): pass
       ...:

    In [6]:

    In [7]: bb=b()

    In [8]: ss=s()

    In [9]: rr=r()

    In [10]: ssrr=sr()

    In [11]: 4.5
    Out[11]: 4.5

    In [12]: str(ss)
    Out[12]: 's'

    In [13]:

    In [14]: %hist -op
    >>> class b:
    ...     pass
    ...
    >>> class s(b):
    ...     def __str__(self):
    ...         return 's'
    ...
    >>>
    >>> class r(b):
    ...     def __repr__(self):
    ...         return 'r'
    ...
    >>> class sr(s,r): pass
    >>>
    >>> bb=b()
    >>> ss=s()
    >>> rr=r()
    >>> ssrr=sr()
    >>> 4.5
    4.5
    >>> str(ss)
    's'
    >>>
    """


def test_hist_pof():
    ip = get_ipython()
    ip.run_cell("1+2", store_history=True)
    # raise Exception(ip.history_manager.session_number)
    # raise Exception(list(ip.history_manager._get_range_session()))
    with TemporaryDirectory() as td:
        tf = os.path.join(td, "hist.py")
        ip.run_line_magic("history", "-pof %s" % tf)
        assert os.path.isfile(tf)


def test_macro():
    ip = get_ipython()
    ip.history_manager.reset()  # Clear any existing history.
    cmds = ["a=1", "def b():\n  return a**2", "print(a,b())"]
    for i, cmd in enumerate(cmds, start=1):
        ip.history_manager.store_inputs(i, cmd)
    ip.run_line_magic("macro", "test 1-3")
    assert ip.user_ns["test"].value == "\n".join(cmds) + "\n"

    # List macros
    assert "test" in ip.run_line_magic("macro", "")


def test_macro_run():
    """Test that we can run a multi-line macro successfully."""
    ip = get_ipython()
    ip.history_manager.reset()
    cmds = ["a=10", "a+=1", "print(a)", "%macro test 2-3"]
    for cmd in cmds:
        ip.run_cell(cmd, store_history=True)
    assert ip.user_ns["test"].value == "a+=1\nprint(a)\n"
    with tt.AssertPrints("12"):
        ip.run_cell("test")
    with tt.AssertPrints("13"):
        ip.run_cell("test")


def test_magic_magic():
    """Test %magic"""
    ip = get_ipython()
    with capture_output() as captured:
        ip.run_line_magic("magic", "")

    stdout = captured.stdout
    assert "%magic" in stdout
    assert "IPython" in stdout
    assert "Available" in stdout


@dec.skipif_not_numpy
def test_numpy_reset_array_undec():
    "Test '%reset array' functionality"
    _ip.ex("import numpy as np")
    _ip.ex("a = np.empty(2)")
    assert "a" in _ip.user_ns
    _ip.run_line_magic("reset", "-f array")
    assert "a" not in _ip.user_ns


@pytest.fixture()
def underscore_not_in_builtins():
    import builtins

    if "_" in builtins.__dict__:
        del builtins.__dict__["_"]


def test_reset_out(underscore_not_in_builtins):
    "Test '%reset out' magic"
    _ip.run_cell("parrot = 'dead'", store_history=True)
    # test '%reset -f out', make an Out prompt
    _ip.run_cell("parrot", store_history=True)
    assert "dead" in [_ip.user_ns[x] for x in ("_", "__", "___")]
    _ip.run_line_magic("reset", "-f out")
    assert "dead" not in [_ip.user_ns[x] for x in ("_", "__", "___")]
    assert len(_ip.user_ns["Out"]) == 0


def test_reset_in():
    "Test '%reset in' magic"
    # test '%reset -f in'
    _ip.run_cell("parrot", store_history=True)
    assert "parrot" in [_ip.user_ns[x] for x in ("_i", "_ii", "_iii")]
    _ip.run_line_magic("reset", "-f in")
    assert "parrot" not in [_ip.user_ns[x] for x in ("_i", "_ii", "_iii")]
    assert len(set(_ip.user_ns["In"])) == 1


def test_reset_dhist():
    "Test '%reset dhist' magic"
    _ip.run_cell("tmp = [d for d in _dh]")  # copy before clearing
    _ip.run_line_magic("cd", os.path.dirname(pytest.__file__))
    _ip.run_line_magic("cd", "-")
    assert len(_ip.user_ns["_dh"]) > 0
    _ip.run_line_magic("reset", "-f dhist")
    assert len(_ip.user_ns["_dh"]) == 0
    _ip.run_cell("_dh = [d for d in tmp]")  # restore


def test_reset_in_length():
    "Test that '%reset in' preserves In[] length"
    _ip.run_cell("print 'foo'")
    _ip.run_cell("reset -f in")
    assert len(_ip.user_ns["In"]) == _ip.displayhook.prompt_count + 1


class TestResetErrors(TestCase):
    def test_reset_redefine(self):
        @magics_class
        class KernelMagics(Magics):
            @line_magic
            def less(self, shell):
                pass

        _ip.register_magics(KernelMagics)

        with self.assertLogs() as cm:
            # hack, we want to just capture logs, but assertLogs fails if not
            # logs get produce.
            # so log one things we ignore.
            import logging as log_mod

            log = log_mod.getLogger()
            log.info("Nothing")
            # end hack.
            _ip.run_cell("reset -f")

        assert len(cm.output) == 1
        for out in cm.output:
            assert "Invalid alias" not in out


def test_tb_syntaxerror():
    """test %tb after a SyntaxError"""
    ip = get_ipython()
    ip.run_cell("for")

    # trap and validate stdout
    save_stdout = sys.stdout
    try:
        sys.stdout = StringIO()
        ip.run_cell("%tb")
        out = sys.stdout.getvalue()
    finally:
        sys.stdout = save_stdout
    # trim output, and only check the last line
    last_line = out.rstrip().splitlines()[-1].strip()
    assert last_line == "SyntaxError: invalid syntax"


def test_time():
    ip = get_ipython()

    with tt.AssertPrints("Wall time: "):
        ip.run_cell("%time None")

    ip.run_cell("def f(kmjy):\n" "    %time print (2*kmjy)")

    with tt.AssertPrints("Wall time: "):
        with tt.AssertPrints("hihi", suppress=False):
            ip.run_cell("f('hi')")

    with tt.AssertPrints("a space"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell('%time print("a space")')


# ';' at the end of %time prevents instruction value to be printed.
# This tests fix for #13837.
def test_time_no_output_with_semicolon():
    ip = get_ipython()

    # Test %time cases
    with tt.AssertPrints(" 123456"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell("%time 123000+456")

    with tt.AssertNotPrints(" 123456"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell("%time 123000+456;")

    with tt.AssertPrints(" 123456"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell("%time 123000+456 # Comment")

    with tt.AssertNotPrints(" 123456"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell("%time 123000+456; # Comment")

    with tt.AssertPrints(" 123456"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell("%time 123000+456 # ;Comment")

    # Test %%time cases
    with tt.AssertPrints("123456"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell("%%time\n123000+456\n\n\n")

    with tt.AssertNotPrints("123456"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell("%%time\n123000+456;\n\n\n")

    with tt.AssertPrints("123456"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell("%%time\n123000+456  # Comment\n\n\n")

    with tt.AssertNotPrints("123456"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell("%%time\n123000+456;  # Comment\n\n\n")

    with tt.AssertPrints("123456"):
        with tt.AssertPrints("Wall time: ", suppress=False):
            with tt.AssertPrints("CPU times: ", suppress=False):
                ip.run_cell("%%time\n123000+456  # ;Comment\n\n\n")


def test_time_last_not_expression():
    ip.run_cell("%%time\n" "var_1 = 1\n" "var_2 = 2\n")
    assert ip.user_ns["var_1"] == 1
    del ip.user_ns["var_1"]
    assert ip.user_ns["var_2"] == 2
    del ip.user_ns["var_2"]


@dec.skip_win32
def test_time2():
    ip = get_ipython()

    with tt.AssertPrints("CPU times: user "):
        ip.run_cell("%time None")


def test_time3():
    """Erroneous magic function calls, issue gh-3334"""
    ip = get_ipython()
    ip.user_ns.pop("run", None)

    with tt.AssertNotPrints("not found", channel="stderr"):
        ip.run_cell("%%time\n" "run = 0\n" "run += 1")


def test_multiline_time(underscore_not_in_builtins):
    """Make sure last statement from time return a value."""
    ip = get_ipython()
    ip.user_ns.pop("run", None)

    ip.run_cell(
        dedent(
            """\
        %%time
        a = "ho"
        b = "hey"
        a+b
        """
        )
    )
    assert ip.user_ns_hidden["_"] == "hohey"


def test_time_local_ns():
    """
    Test that local_ns is actually global_ns when running a cell magic
    """
    ip = get_ipython()
    ip.run_cell("%%time\n" "myvar = 1")
    assert ip.user_ns["myvar"] == 1
    del ip.user_ns["myvar"]


def test_time_microseconds_display():
    """Ensure ASCII is used when necessary"""
    with mock.patch("sys.stdout", io.TextIOWrapper(StringIO(), encoding="utf-8")):
        assert execution._format_time(0.000001) == "1 \u03bcs"
    with mock.patch("sys.stdout", io.TextIOWrapper(StringIO(), encoding="ascii")):
        assert execution._format_time(0.000001) == "1 us"


# Test %%capture magic. Added to test issue #13926
def test_capture():
    ip = get_ipython()

    # Test %%capture nominal case
    ip.run_cell("%%capture abc\n1+2")
    with tt.AssertPrints("True", suppress=False):
        ip.run_cell("'abc' in locals()")
    with tt.AssertPrints("True", suppress=False):
        ip.run_cell("'outputs' in dir(abc)")
    with tt.AssertPrints("3", suppress=False):
        ip.run_cell("abc.outputs[0]")

    # Test %%capture with ';' at end of expression
    ip.run_cell("%%capture abc\n7+8;")
    with tt.AssertPrints("False", suppress=False):
        ip.run_cell("'abc' in locals()")


def test_doctest_mode():
    "Toggle doctest_mode twice, it should be a no-op and run without error"
    _ip.run_line_magic("doctest_mode", "")
    _ip.run_line_magic("doctest_mode", "")


def test_parse_options():
    """Tests for basic options parsing in magics."""
    # These are only the most minimal of tests, more should be added later.  At
    # the very least we check that basic text/unicode calls work OK.
    m = DummyMagics(_ip)
    assert m.parse_options("foo", "")[1] == "foo"
    assert m.parse_options("foo", "")[1] == "foo"


def test_parse_options_preserve_non_option_string():
    """Test to assert preservation of non-option part of magic-block, while parsing magic options."""
    m = DummyMagics(_ip)
    opts, stmt = m.parse_options(
        " -n1  -r 13 _ = 314 + foo", "n:r:", preserve_non_opts=True
    )
    assert opts == {"n": "1", "r": "13"}
    assert stmt == "_ = 314 + foo"


def test_run_magic_preserve_code_block():
    """Test to assert preservation of non-option part of magic-block, while running magic."""
    _ip.user_ns["spaces"] = []
    _ip.run_line_magic(
        "timeit", "-n1 -r1 spaces.append([s.count(' ') for s in ['document']])"
    )
    assert _ip.user_ns["spaces"] == [[0]]


def test_dirops():
    """Test various directory handling operations."""
    # curpath = lambda :os.path.splitdrive(os.getcwd())[1].replace('\\','/')
    curpath = os.getcwd
    startdir = os.getcwd()
    ipdir = os.path.realpath(_ip.ipython_dir)
    try:
        _ip.run_line_magic("cd", '"%s"' % ipdir)
        assert curpath() == ipdir
        _ip.run_line_magic("cd", "-")
        assert curpath() == startdir
        _ip.run_line_magic("pushd", '"%s"' % ipdir)
        assert curpath() == ipdir
        _ip.run_line_magic("popd", "")
        assert curpath() == startdir
    finally:
        os.chdir(startdir)


def test_cd_force_quiet():
    """Test OSMagics.cd_force_quiet option"""
    _ip.config.OSMagics.cd_force_quiet = True
    osmagics = osm.OSMagics(shell=_ip)

    startdir = os.getcwd()
    ipdir = os.path.realpath(_ip.ipython_dir)

    try:
        with tt.AssertNotPrints(ipdir):
            osmagics.cd('"%s"' % ipdir)
        with tt.AssertNotPrints(startdir):
            osmagics.cd("-")
    finally:
        os.chdir(startdir)


def test_xmode():
    # Calling xmode three times should be a no-op
    xmode = _ip.InteractiveTB.mode
    for i in range(5):
        _ip.run_line_magic("xmode", "")
    assert _ip.InteractiveTB.mode == xmode


def test_reset_hard():
    monitor = []

    class A(object):
        def __del__(self):
            monitor.append(1)

        def __repr__(self):
            return "<A instance>"

    _ip.user_ns["a"] = A()
    _ip.run_cell("a")

    assert monitor == []
    _ip.run_line_magic("reset", "-f")
    assert monitor == [1]


class TestXdel(tt.TempFileMixin):
    def test_xdel(self):
        """Test that references from %run are cleared by xdel."""
        src = (
            "class A(object):\n"
            "    monitor = []\n"
            "    def __del__(self):\n"
            "        self.monitor.append(1)\n"
            "a = A()\n"
        )
        self.mktmp(src)
        # %run creates some hidden references...
        _ip.run_line_magic("run", "%s" % self.fname)
        # ... as does the displayhook.
        _ip.run_cell("a")

        monitor = _ip.user_ns["A"].monitor
        assert monitor == []

        _ip.run_line_magic("xdel", "a")

        # Check that a's __del__ method has been called.
        gc.collect(0)
        assert monitor == [1]


def doctest_who():
    """doctest for %who

    In [1]: %reset -sf

    In [2]: alpha = 123

    In [3]: beta = 'beta'

    In [4]: %who int
    alpha

    In [5]: %who str
    beta

    In [6]: %whos
    Variable   Type    Data/Info
    ----------------------------
    alpha      int     123
    beta       str     beta

    In [7]: %who_ls
    Out[7]: ['alpha', 'beta']
    """


def test_whos():
    """Check that whos is protected against objects where repr() fails."""

    class A(object):
        def __repr__(self):
            raise Exception()

    _ip.user_ns["a"] = A()
    _ip.run_line_magic("whos", "")


@pytest.mark.parametrize(
    "input,expected",
    (
        (
            "The quick brown fox jumps over the lazy dog",
            "The quick brown fox jumps over the lazy dog",
        ),
        (
            " ".join(["The quick brown fox jumps over the lazy dog"] * 2),
            "The quick brown fox jumps<...>x jumps over the lazy dog",
        ),
        ("\nThis is \n\na long\n\nstring\n", r"\nThis is \n\na long\n\nstring\n"),
        (
            "\rThis is \r\ra long\r\rstring\r",
            r"\rThis is \r\ra long\r\rstring\r",
        ),
    ),
)
def test_whos_longstr(input, expected):
    ip = get_ipython()
    ip.user_ns["input"] = input
    ip.user_ns["expected"] = expected
    with capture_output() as captured:
        ip.run_line_magic("whos", "")
    stdout = captured.stdout
    from_whos = " ".join(
        re.split(
            r"\s+str\s+",
            [l for l in stdout.splitlines() if l.startswith("expected")][0],
        )[1:]
    )
    assert expected == from_whos


def test_whos_len_namedtuple():
    _ip = get_ipython()
    _ip.run_line_magic("reset", "-f")
    _ip.user_ns["alpha"] = 123
    _ip.user_ns["beta"] = "test"
    _ip.user_ns["nt"] = collections.namedtuple("MyNamedTuple", "col1 col2")
    _ip.user_ns["x"] = _ip.user_ns["nt"]("a", "b")
    expected = (
        "Variable   Type            Data/Info\n"
        "------------------------------------\n"
        "alpha      int             123\n"
        "beta       str             test\n"
        "nt         type            <class 'tests.test_magic.MyNamedTuple'>\n"
        "x          MyNamedTuple    MyNamedTuple(col1='a', col2='b')\n"
    )
    with capture_output() as captured:
        ip.run_line_magic("whos", "")
    stdout = captured.stdout
    assert stdout == expected.strip() + "\n"


@dec.skip_without("pandas")
def test_whos_len_pandas():
    import pandas as pd

    _ip = get_ipython()
    _ip.run_line_magic("reset", "-f")
    df = pd.DataFrame({"a": range(10), "b": range(10, 20)})
    _ip.user_ns["df"] = df
    s = df["a"]
    _ip.user_ns["s"] = s
    expected = (
        "Variable   Type         Data/Info\n"
        "---------------------------------\n"
        "df         DataFrame    Shape: (10, 2)\n"
        "s          Series       Shape: (10,)\n"
    )
    with capture_output() as captured:
        ip.run_line_magic("whos", "")
    stdout = captured.stdout
    assert stdout == expected.strip() + "\n"


def doctest_precision():
    """doctest for %precision

    In [1]: f = get_ipython().display_formatter.formatters['text/plain']

    In [2]: %precision 5
    Out[2]: '%.5f'

    In [3]: f.float_format
    Out[3]: '%.5f'

    In [4]: %precision %e
    Out[4]: '%e'

    In [5]: f(3.1415927)
    Out[5]: '3.141593e+00'
    """


def test_debug_magic():
    """Test debugging a small code with %debug

    In [1]: with PdbTestInput(['c']):
       ...:     %debug print("a b") #doctest: +ELLIPSIS
       ...:
    ...
    ipdb> c
    a b
    In [2]:
    """


def test_debug_magic_locals():
    """Test debugging a small code with %debug with locals

    In [1]: with PdbTestInput(['c']):
       ...:     def fun():
       ...:         res = 1
       ...:         %debug print(res)
       ...:     fun()
       ...:
    ...
    ipdb> c
    1
    In [2]:
    """


def test_psearch():
    with tt.AssertPrints("dict.fromkeys"):
        _ip.run_cell("dict.fr*?")
    with tt.AssertPrints("π.is_integer"):
        _ip.run_cell("π = 3.14;\nπ.is_integ*?")


def test_timeit_shlex():
    """test shlex issues with timeit (#1109)"""
    _ip.ex("def f(*a,**kw): pass")
    _ip.run_line_magic("timeit", '-n1 "this is a bug".count(" ")')
    _ip.run_line_magic("timeit", '-r1 -n1 f(" ", 1)')
    _ip.run_line_magic("timeit", '-r1 -n1 f(" ", 1, " ", 2, " ")')
    _ip.run_line_magic("timeit", '-r1 -n1 ("a " + "b")')
    _ip.run_line_magic("timeit", '-r1 -n1 f("a " + "b")')
    _ip.run_line_magic("timeit", '-r1 -n1 f("a " + "b ")')


def test_timeit_special_syntax():
    "Test %%timeit with IPython special syntax"

    @register_line_magic
    def lmagic(line):
        ip = get_ipython()
        ip.user_ns["lmagic_out"] = line

    # line mode test
    _ip.run_line_magic("timeit", "-n1 -r1 %lmagic my line")
    assert _ip.user_ns["lmagic_out"] == "my line"
    # cell mode test
    _ip.run_cell_magic("timeit", "-n1 -r1", "%lmagic my line2")
    assert _ip.user_ns["lmagic_out"] == "my line2"


def test_timeit_return():
    """
    test whether timeit -o return object
    """

    res = _ip.run_line_magic("timeit", "-n10 -r10 -o 1")
    assert res is not None


def test_timeit_save():
    """
    test whether timeit -v save object
    """

    name = "some_variable_name"
    _ip.run_line_magic("timeit", "-n10 -r10 -v %s 1" % name)
    assert _ip.user_ns[name] is not None


def test_timeit_quiet():
    """
    test quiet option of timeit magic
    """
    with tt.AssertNotPrints("loops"):
        _ip.run_cell("%timeit -n1 -r1 -q 1")


def test_timeit_return_quiet():
    with tt.AssertNotPrints("loops"):
        res = _ip.run_line_magic("timeit", "-n1 -r1 -q -o 1")
    assert res is not None


def test_timeit_invalid_return():
    with pytest.raises(SyntaxError):
        _ip.run_line_magic("timeit", "return")


def test_timeit_raise_on_interrupt():
    ip = get_ipython()

    with pytest.raises(KeyboardInterrupt):
        thread = Thread(target=_interrupt_after_1s)
        thread.start()
        ip.run_cell_magic("timeit", "", "from time import sleep; sleep(2)")
        thread.join()


@dec.skipif(execution.profile is None)
def test_prun_special_syntax():
    "Test %%prun with IPython special syntax"

    @register_line_magic
    def lmagic(line):
        ip = get_ipython()
        ip.user_ns["lmagic_out"] = line

    # line mode test
    _ip.run_line_magic("prun", "-q %lmagic my line")
    assert _ip.user_ns["lmagic_out"] == "my line"
    # cell mode test
    _ip.run_cell_magic("prun", "-q", "%lmagic my line2")
    assert _ip.user_ns["lmagic_out"] == "my line2"


@dec.skipif(execution.profile is None)
def test_prun_quotes():
    "Test that prun does not clobber string escapes (GH #1302)"
    _ip.run_line_magic("prun", r"-q x = '\t'")
    assert _ip.user_ns["x"] == "\t"


def test_extension():
    # Debugging information for failures of this test
    assert "daft_extension" not in sys.modules
    daft_path = os.path.join(os.path.dirname(__file__), "fake_ext_dir")
    if daft_path in sys.path:
        # it is likely pytest added this path
        # when scanning for tests
        sys.path.remove(daft_path)
    print("sys.path:")
    for p in sys.path:
        print(" ", p)
    print("CWD", os.getcwd())

    with pytest.raises(ModuleNotFoundError):
        import daft_extension

    with pytest.raises(ModuleNotFoundError):
        _ip.run_line_magic("load_ext", "daft_extension")
    sys.path.insert(0, daft_path)
    import daft_extension

    try:
        _ip.user_ns.pop("arq", None)
        invalidate_caches()  # Clear import caches
        _ip.run_line_magic("load_ext", "daft_extension")
        assert _ip.user_ns["arq"] == 185
        _ip.run_line_magic("unload_ext", "daft_extension")
        assert "arq" not in _ip.user_ns
    finally:
        sys.path.remove(daft_path)


def test_notebook_export_json():
    pytest.importorskip("nbformat")
    from nbformat import read, sign

    _ip = get_ipython()
    _ip.history_manager.reset()  # Clear any existing history.
    _ip.run_line_magic("config", "NotebookNotary.algorithm = 'sha384'")
    cmds = ["a=1", "def b():\n  return a**2", "print('noël, été', b())"]
    for i, cmd in enumerate(cmds, start=1):
        _ip.history_manager.store_inputs(i, cmd)
    with TemporaryDirectory() as td:
        outfile = os.path.join(td, "nb.ipynb")
        _ip.run_line_magic("notebook", "%s" % outfile)
        with open(outfile) as f:
            exported = json.load(f)
        nb = read(outfile, as_version=4)

    # check metadata
    language_info = exported["metadata"]["language_info"]
    assert language_info["name"] == "python"
    assert language_info["file_extension"] == ".py"
    assert language_info["version"] == platform.python_version()

    kernelspec = exported["metadata"]["kernelspec"]
    assert kernelspec["language"] == "python"

    # Check if notebook is trusted
    notary = sign.NotebookNotary(algorithm="sha384")
    is_trusted = notary.check_signature(nb)
    assert is_trusted, "Exported notebook should be trusted"


def test_notebook_export_json_with_output():
    """Tests if notebook export correctly captures outputs, errors, display outputs, and stream outputs."""
    pytest.importorskip("nbformat")
    pytest.importorskip("nbclient")
    import nbformat
    from nbclient import NotebookClient

    _ip = get_ipython()
    _ip.history_manager.reset()
    _ip.colors = "neutral"
    _ip.execution_count = 1

    try:
        commands = [
            "1/0",
            "print('test')",
            "display('test')",
            "1+1",
            "display('a'), display('b')",
            "import sys\nprint('test', file=sys.stderr)",
        ]

        clean_nb = nbformat.v4.new_notebook(
            cells=[nbformat.v4.new_code_cell(source=cmd) for cmd in commands]
        )

        with TemporaryDirectory() as td:
            outfile = os.path.join(td, "nb.ipynb")
            client = NotebookClient(
                clean_nb,
                timeout=600,
                kernel_name="python3",
                resources={"metadata": {"path": td}},
                allow_errors=True,
            )
            client.execute()
            nbformat.write(clean_nb, outfile)
            expected_nb = nbformat.read(outfile, as_version=4)

        for cmd in commands:
            _ip.run_cell(cmd, store_history=True, silent=False)
            print(f"\n{_ip.history_manager.outputs}\n")

        with TemporaryDirectory() as td:
            outfile = os.path.join(td, "nb.ipynb")
            _ip.run_cell(f"%notebook {outfile}", store_history=True)
            sleep(2)
            actual_nb = nbformat.read(outfile, as_version=4)

        assert len(actual_nb["cells"]) == len(commands)
        assert len(expected_nb["cells"]) == len(commands)

        for i, command in enumerate(commands):
            actual = actual_nb["cells"][i]
            expected = expected_nb["cells"][i]
            assert expected["source"] == command
            assert actual["source"] == expected["source"]
            assert (
                actual["outputs"] == expected["outputs"]
            ), f"Outputs do not match for cell {i+1} with source {command!r}"
    finally:
        _ip.colors = "nocolor"


def test_notebook_export_single_display():
    """Test that multiple MIME types create a single display_data output, not multiple."""
    pytest.importorskip("nbformat")

    _ip = get_ipython()
    orig_outputs = _ip.history_manager.outputs.copy()
    orig_execution_count = _ip.execution_count
    _ip.history_manager.reset()

    try:
        execution_count = _ip.execution_count = 1
        _ip.run_cell("'test'", store_history=True, silent=False)

        # Mock display output with multiple MIME types
        test_display_history = HistoryOutput(
            output_type="display_data",
            bundle={"text/plain": "test", "text/html": "<div>test</div>"},
        )
        _ip.history_manager.outputs[execution_count] = [test_display_history]

        with TemporaryDirectory() as td:
            outfile = f"{td}/test.ipynb"
            _ip.run_cell(f"%notebook {outfile}", store_history=True, silent=False)

            # Verify single display_data output with both MIME types
            with open(outfile, "r") as f:
                nb = json.load(f)

        cell = nb["cells"][0]
        display_outputs = [
            out for out in cell["outputs"] if out["output_type"] == "display_data"
        ]

        assert (
            len(display_outputs) == 1
        ), f"Expected 1 display_data output, got {len(display_outputs)}"

        output_data = display_outputs[0]["data"]
        assert set(output_data.keys()) == {"text/plain", "text/html"}
        assert output_data["text/plain"] == ["test"]
        assert output_data["text/html"] == ["<div>test</div>"]

    finally:
        _ip.history_manager.outputs = orig_outputs
        _ip.execution_count = orig_execution_count


class TestEnv(TestCase):
    def test_env(self):
        env = _ip.run_line_magic("env", "")
        self.assertTrue(isinstance(env, dict))

    def test_env_secret(self):
        env = _ip.run_line_magic("env", "")
        hidden = "<hidden>"
        with mock.patch.dict(
            os.environ,
            {
                "API_KEY": "abc123",
                "SECRET_THING": "ssshhh",
                "JUPYTER_TOKEN": "",
                "VAR": "abc",
            },
        ):
            env = _ip.run_line_magic("env", "")
        assert env["API_KEY"] == hidden
        assert env["SECRET_THING"] == hidden
        assert env["JUPYTER_TOKEN"] == hidden
        assert env["VAR"] == "abc"

    def test_env_get_set_simple(self):
        env = _ip.run_line_magic("env", "var val1")
        self.assertEqual(env, None)
        self.assertEqual(os.environ["var"], "val1")
        self.assertEqual(_ip.run_line_magic("env", "var"), "val1")
        env = _ip.run_line_magic("env", "var=val2")
        self.assertEqual(env, None)
        self.assertEqual(os.environ["var"], "val2")

    def test_env_get_set_complex(self):
        env = _ip.run_line_magic("env", "var 'val1 '' 'val2")
        self.assertEqual(env, None)
        self.assertEqual(os.environ["var"], "'val1 '' 'val2")
        self.assertEqual(_ip.run_line_magic("env", "var"), "'val1 '' 'val2")
        env = _ip.run_line_magic("env", 'var=val2 val3="val4')
        self.assertEqual(env, None)
        self.assertEqual(os.environ["var"], 'val2 val3="val4')

    def test_env_set_bad_input(self):
        self.assertRaises(UsageError, lambda: _ip.run_line_magic("set_env", "var"))

    def test_env_set_whitespace(self):
        self.assertRaises(UsageError, lambda: _ip.run_line_magic("env", "var A=B"))


def check_ident(magic):
    # Manually called, we get the result
    out = _ip.run_cell_magic(magic, "a", "b")
    assert out == ("a", "b")
    # Via run_cell, it goes into the user's namespace via displayhook
    _ip.run_cell("%%" + magic + " c\nd\n")
    assert _ip.user_ns["_"] == ("c", "d\n")


def test_cell_magic_func_deco(underscore_not_in_builtins):
    "Cell magic using simple decorator"

    @register_cell_magic
    def cellm(line, cell):
        return line, cell

    check_ident("cellm")


def test_cell_magic_reg(underscore_not_in_builtins):
    "Cell magic manually registered"

    def cellm(line, cell):
        return line, cell

    _ip.register_magic_function(cellm, "cell", "cellm2")
    check_ident("cellm2")


def test_cell_magic_class(underscore_not_in_builtins):
    "Cell magics declared via a class"

    @magics_class
    class MyMagics(Magics):
        @cell_magic
        def cellm3(self, line, cell):
            return line, cell

    _ip.register_magics(MyMagics)
    check_ident("cellm3")


def test_cell_magic_class2(underscore_not_in_builtins):
    "Cell magics declared via a class, #2"

    @magics_class
    class MyMagics2(Magics):
        @cell_magic("cellm4")
        def cellm33(self, line, cell):
            return line, cell

    _ip.register_magics(MyMagics2)
    check_ident("cellm4")
    # Check that nothing is registered as 'cellm33'
    c33 = _ip.find_cell_magic("cellm33")
    assert c33 == None


def test_file():
    """Basic %%writefile"""
    ip = get_ipython()
    with TemporaryDirectory() as td:
        fname = os.path.join(td, "file1")
        ip.run_cell_magic(
            "writefile",
            fname,
            "\n".join(
                [
                    "line1",
                    "line2",
                ]
            ),
        )
        s = Path(fname).read_text(encoding="utf-8")
        assert "line1\n" in s
        assert "line2" in s


@dec.skip_win32
def test_file_single_quote():
    """Basic %%writefile with embedded single quotes"""
    ip = get_ipython()
    with TemporaryDirectory() as td:
        fname = os.path.join(td, "'file1'")
        ip.run_cell_magic(
            "writefile",
            fname,
            "\n".join(
                [
                    "line1",
                    "line2",
                ]
            ),
        )
        s = Path(fname).read_text(encoding="utf-8")
        assert "line1\n" in s
        assert "line2" in s


@dec.skip_win32
def test_file_double_quote():
    """Basic %%writefile with embedded double quotes"""
    ip = get_ipython()
    with TemporaryDirectory() as td:
        fname = os.path.join(td, '"file1"')
        ip.run_cell_magic(
            "writefile",
            fname,
            "\n".join(
                [
                    "line1",
                    "line2",
                ]
            ),
        )
        s = Path(fname).read_text(encoding="utf-8")
        assert "line1\n" in s
        assert "line2" in s


def test_file_var_expand():
    """%%writefile $filename"""
    ip = get_ipython()
    with TemporaryDirectory() as td:
        fname = os.path.join(td, "file1")
        ip.user_ns["filename"] = fname
        ip.run_cell_magic(
            "writefile",
            "$filename",
            "\n".join(
                [
                    "line1",
                    "line2",
                ]
            ),
        )
        s = Path(fname).read_text(encoding="utf-8")
        assert "line1\n" in s
        assert "line2" in s


def test_file_unicode():
    """%%writefile with unicode cell"""
    ip = get_ipython()
    with TemporaryDirectory() as td:
        fname = os.path.join(td, "file1")
        ip.run_cell_magic(
            "writefile",
            fname,
            "\n".join(
                [
                    "liné1",
                    "liné2",
                ]
            ),
        )
        with io.open(fname, encoding="utf-8") as f:
            s = f.read()
        assert "liné1\n" in s
        assert "liné2" in s


def test_file_amend():
    """%%writefile -a amends files"""
    ip = get_ipython()
    with TemporaryDirectory() as td:
        fname = os.path.join(td, "file2")
        ip.run_cell_magic(
            "writefile",
            fname,
            "\n".join(
                [
                    "line1",
                    "line2",
                ]
            ),
        )
        ip.run_cell_magic(
            "writefile",
            "-a %s" % fname,
            "\n".join(
                [
                    "line3",
                    "line4",
                ]
            ),
        )
        s = Path(fname).read_text(encoding="utf-8")
        assert "line1\n" in s
        assert "line3\n" in s


def test_file_spaces():
    """%%file with spaces in filename"""
    ip = get_ipython()
    with TemporaryWorkingDirectory() as td:
        fname = "file name"
        ip.run_cell_magic(
            "file",
            '"%s"' % fname,
            "\n".join(
                [
                    "line1",
                    "line2",
                ]
            ),
        )
        s = Path(fname).read_text(encoding="utf-8")
        assert "line1\n" in s
        assert "line2" in s


def test_script_config():
    ip = get_ipython()
    ip.config.ScriptMagics.script_magics = ["whoda"]
    sm = script.ScriptMagics(shell=ip)
    assert "whoda" in sm.magics["cell"]


def _interrupt_after_1s():
    sleep(1)
    signal.raise_signal(signal.SIGINT)


def test_script_raise_on_interrupt():
    ip = get_ipython()

    with pytest.raises(CalledProcessError):
        thread = Thread(target=_interrupt_after_1s)
        thread.start()
        ip.run_cell_magic(
            "script", f"{sys.executable}", "from time import sleep; sleep(2)"
        )
        thread.join()


def test_script_do_not_raise_on_interrupt():
    ip = get_ipython()

    thread = Thread(target=_interrupt_after_1s)
    thread.start()
    ip.run_cell_magic(
        "script",
        f"--no-raise-error {sys.executable}",
        "from time import sleep; sleep(2)",
    )
    thread.join()


def test_script_out():
    ip = get_ipython()
    ip.run_cell_magic("script", f"--out output {sys.executable}", "print('hi')")
    assert ip.user_ns["output"].strip() == "hi"


def test_script_out_multiple_lines():
    ip = get_ipython()
    code = "print('hi')\nprint('this')\nprint('is')\nprint('ipython')"
    ip.run_cell_magic("script", f"--out output {sys.executable}", code)
    assert ip.user_ns["output"].strip().splitlines() == ["hi", "this", "is", "ipython"]


def test_script_err():
    ip = get_ipython()
    ip.run_cell_magic(
        "script",
        f"--err error {sys.executable}",
        "import sys; print('hello', file=sys.stderr)",
    )
    assert ip.user_ns["error"].strip() == "hello"


def test_script_out_err():
    ip = get_ipython()
    ip.run_cell_magic(
        "script",
        f"--out output --err error {sys.executable}",
        "\n".join(
            [
                "import sys",
                "print('hi')",
                "print('hello', file=sys.stderr)",
            ]
        ),
    )
    assert ip.user_ns["output"].strip() == "hi"
    assert ip.user_ns["error"].strip() == "hello"


@pytest.mark.asyncio
async def test_script_bg_out():
    ip = get_ipython()
    ip.run_cell_magic("script", f"--bg --out output {sys.executable}", "print('hi')")
    assert (await ip.user_ns["output"].read()).strip() == b"hi"
    assert ip.user_ns["output"].at_eof()


@pytest.mark.asyncio
async def test_script_bg_err():
    ip = get_ipython()
    ip.run_cell_magic(
        "script",
        f"--bg --err error {sys.executable}",
        "import sys; print('hello', file=sys.stderr)",
    )
    assert (await ip.user_ns["error"].read()).strip() == b"hello"
    assert ip.user_ns["error"].at_eof()


@pytest.mark.asyncio
async def test_script_bg_out_err():
    ip = get_ipython()
    ip.run_cell_magic(
        "script",
        f"--bg --out output --err error {sys.executable}",
        "\n".join(
            [
                "import sys",
                "print('hi')",
                "print('hello', file=sys.stderr)",
            ]
        ),
    )
    assert (await ip.user_ns["output"].read()).strip() == b"hi"
    assert (await ip.user_ns["error"].read()).strip() == b"hello"
    assert ip.user_ns["output"].at_eof()
    assert ip.user_ns["error"].at_eof()


@pytest.mark.asyncio
async def test_script_bg_proc():
    ip = get_ipython()
    ip.run_cell_magic(
        "script",
        f"--bg --out output --proc p {sys.executable}",
        "\n".join(
            [
                "import sys",
                "print('hi')",
                "print('hello', file=sys.stderr)",
            ]
        ),
    )
    p = ip.user_ns["p"]
    await p.wait()
    stdout = await p.stdout.read()
    stderr = await p.stderr.read()
    assert p.returncode == 0, (stdout, stderr)
    assert stdout.strip() == b"hi"
    # not captured, so empty
    assert (stderr) == b""
    assert p.stdout.at_eof()
    assert p.stderr.at_eof()


def test_script_defaults():
    ip = get_ipython()
    for cmd in ["sh", "bash", "perl", "ruby"]:
        try:
            find_cmd(cmd)
        except Exception:
            pass
        else:
            assert cmd in ip.magics_manager.magics["cell"]


@pytest.mark.asyncio
async def test_script_streams_continuously(capsys):
    ip = get_ipython()
    # Windows is slow to start up a thread on CI
    is_windows = os.name == "nt"
    step = 3 if is_windows else 1
    code = dedent(
        f"""\
    import time
    for _ in range(3):
        time.sleep({step})
        print(".", flush=True, end="")
    """
    )

    def print_numbers():
        for i in range(3):
            sleep(step)
            print(i, flush=True, end="")

    thread = Thread(target=print_numbers)
    thread.start()
    sleep(step / 2)
    ip.run_cell_magic("script", sys.executable, code)
    thread.join()

    captured = capsys.readouterr()
    # If the streaming was line-wise or broken
    # we would get `012...`
    assert captured.out == "0.1.2."


def test_script_streams_multibyte_unicode(capsys):
    ip = get_ipython()
    # € in UTF-8 is encoded using 3 bytes
    code = "print('€' * 1000, end='')"
    is_windows = os.name == "nt"
    command = sys.executable
    if is_windows:
        # windows does not use UTF for streams/pipes by default
        command += " -X utf8"
    ip.run_cell_magic("script", command, code)

    captured = capsys.readouterr()
    assert captured.out == "€" * 1000


@magics_class
class FooFoo(Magics):
    """class with both %foo and %%foo magics"""

    @line_magic("foo")
    def line_foo(self, line):
        "I am line foo"
        pass

    @cell_magic("foo")
    def cell_foo(self, line, cell):
        "I am cell foo, not line foo"
        pass


def test_line_cell_info():
    """%%foo and %foo magics are distinguishable to inspect"""
    ip = get_ipython()
    ip.magics_manager.register(FooFoo)
    oinfo = ip.object_inspect("foo")
    assert oinfo["found"] is True
    assert oinfo["ismagic"] is True

    oinfo = ip.object_inspect("%%foo")
    assert oinfo["found"] is True
    assert oinfo["ismagic"] is True
    assert oinfo["docstring"] == FooFoo.cell_foo.__doc__

    oinfo = ip.object_inspect("%foo")
    assert oinfo["found"] is True
    assert oinfo["ismagic"] is True
    assert oinfo["docstring"] == FooFoo.line_foo.__doc__


def test_multiple_magics():
    ip = get_ipython()
    foo1 = FooFoo(ip)
    foo2 = FooFoo(ip)
    mm = ip.magics_manager
    mm.register(foo1)
    assert mm.magics["line"]["foo"].__self__ is foo1
    mm.register(foo2)
    assert mm.magics["line"]["foo"].__self__ is foo2


def test_alias_magic():
    """Test %alias_magic."""
    ip = get_ipython()
    mm = ip.magics_manager

    # Basic operation: both cell and line magics are created, if possible.
    ip.run_line_magic("alias_magic", "timeit_alias timeit")
    assert "timeit_alias" in mm.magics["line"]
    assert "timeit_alias" in mm.magics["cell"]

    # --cell is specified, line magic not created.
    ip.run_line_magic("alias_magic", "--cell timeit_cell_alias timeit")
    assert "timeit_cell_alias" not in mm.magics["line"]
    assert "timeit_cell_alias" in mm.magics["cell"]

    # Test that line alias is created successfully.
    ip.run_line_magic("alias_magic", "--line env_alias env")
    assert ip.run_line_magic("env", "") == ip.run_line_magic("env_alias", "")

    # Test that line alias with parameters passed in is created successfully.
    ip.run_line_magic(
        "alias_magic", "--line history_alias history --params " + shlex.quote("3")
    )
    assert "history_alias" in mm.magics["line"]


def test_save():
    """Test %save."""
    ip = get_ipython()
    ip.history_manager.reset()  # Clear any existing history.
    cmds = ["a=1", "def b():\n  return a**2", "print(a, b())"]
    for i, cmd in enumerate(cmds, start=1):
        ip.history_manager.store_inputs(i, cmd)
    with TemporaryDirectory() as tmpdir:
        file = os.path.join(tmpdir, "testsave.py")
        ip.run_line_magic("save", "%s 1-10" % file)
        content = Path(file).read_text(encoding="utf-8")
        assert content.count(cmds[0]) == 1
        assert "coding: utf-8" in content
        ip.run_line_magic("save", "-a %s 1-10" % file)
        content = Path(file).read_text(encoding="utf-8")
        assert content.count(cmds[0]) == 2
        assert "coding: utf-8" in content


def test_save_with_no_args():
    ip = get_ipython()
    ip.history_manager.reset()  # Clear any existing history.
    cmds = ["a=1", "def b():\n    return a**2", "print(a, b())", "%save"]
    for i, cmd in enumerate(cmds, start=1):
        ip.history_manager.store_inputs(i, cmd)

    with TemporaryDirectory() as tmpdir:
        path = os.path.join(tmpdir, "testsave.py")
        ip.run_line_magic("save", path)
        content = Path(path).read_text(encoding="utf-8")
        expected_content = dedent(
            """\
            # coding: utf-8
            a=1
            def b():
                return a**2
            print(a, b())
            """
        )
        assert content == expected_content


def test_store():
    """Test %store."""
    ip = get_ipython()
    ip.run_line_magic("load_ext", "storemagic")

    # make sure the storage is empty
    ip.run_line_magic("store", "-z")
    ip.user_ns["var"] = 42
    ip.run_line_magic("store", "var")
    ip.user_ns["var"] = 39
    ip.run_line_magic("store", "-r")
    assert ip.user_ns["var"] == 42

    ip.run_line_magic("store", "-d var")
    ip.user_ns["var"] = 39
    ip.run_line_magic("store", "-r")
    assert ip.user_ns["var"] == 39


def _run_edit_test(
    arg_s, exp_filename=None, exp_lineno=-1, exp_contents=None, exp_is_temp=None
):
    ip = get_ipython()
    M = code.CodeMagics(ip)
    last_call = ["", ""]
    opts, args = M.parse_options(arg_s, "prxn:")
    filename, lineno, is_temp = M._find_edit_target(ip, args, opts, last_call)

    if exp_filename is not None:
        assert exp_filename == filename
    if exp_contents is not None:
        with io.open(filename, "r", encoding="utf-8") as f:
            contents = f.read()
        assert exp_contents == contents
    if exp_lineno != -1:
        assert exp_lineno == lineno
    if exp_is_temp is not None:
        assert exp_is_temp == is_temp


def test_edit_interactive():
    """%edit on interactively defined objects"""
    ip = get_ipython()
    n = ip.execution_count
    ip.run_cell("def foo(): return 1", store_history=True)

    with pytest.raises(code.InteractivelyDefined) as e:
        _run_edit_test("foo")
    assert e.value.index == n


def test_edit_cell():
    """%edit [cell id]"""
    ip = get_ipython()

    ip.run_cell("def foo(): return 1", store_history=True)

    # test
    _run_edit_test("1", exp_contents=ip.user_ns["In"][1], exp_is_temp=True)


def test_edit_fname():
    """%edit file"""
    # test
    _run_edit_test("test file.py", exp_filename="test file.py")


def test_bookmark():
    ip = get_ipython()
    ip.run_line_magic("bookmark", "bmname")
    with tt.AssertPrints("bmname"):
        ip.run_line_magic("bookmark", "-l")
    ip.run_line_magic("bookmark", "-d bmname")


def test_ls_magic():
    ip = get_ipython()
    json_formatter = ip.display_formatter.formatters["application/json"]
    json_formatter.enabled = True
    lsmagic = ip.run_line_magic("lsmagic", "")
    with warnings.catch_warnings(record=True) as w:
        j = json_formatter(lsmagic)
    assert sorted(j) == ["cell", "line"]
    assert w == []  # no warnings


def test_strip_initial_indent():
    def sii(s):
        lines = s.splitlines()
        return "\n".join(code.strip_initial_indent(lines))

    assert sii("  a = 1\nb = 2") == "a = 1\nb = 2"
    assert sii("  a\n    b\nc") == "a\n  b\nc"
    assert sii("a\n  b") == "a\n  b"


def test_logging_magic_quiet_from_arg():
    _ip.config.LoggingMagics.quiet = False
    lm = logging.LoggingMagics(shell=_ip)
    with TemporaryDirectory() as td:
        try:
            with tt.AssertNotPrints(re.compile("Activating.*")):
                lm.logstart("-q {}".format(os.path.join(td, "quiet_from_arg.log")))
        finally:
            _ip.logger.logstop()


def test_logging_magic_quiet_from_config():
    _ip.config.LoggingMagics.quiet = True
    lm = logging.LoggingMagics(shell=_ip)
    with TemporaryDirectory() as td:
        try:
            with tt.AssertNotPrints(re.compile("Activating.*")):
                lm.logstart(os.path.join(td, "quiet_from_config.log"))
        finally:
            _ip.logger.logstop()


def test_logging_magic_not_quiet():
    _ip.config.LoggingMagics.quiet = False
    lm = logging.LoggingMagics(shell=_ip)
    with TemporaryDirectory() as td:
        try:
            with tt.AssertPrints(re.compile("Activating.*")):
                lm.logstart(os.path.join(td, "not_quiet.log"))
        finally:
            _ip.logger.logstop()


def test_time_no_var_expand():
    _ip.user_ns["a"] = 5
    _ip.user_ns["b"] = []
    _ip.run_line_magic("time", 'b.append("{a}")')
    assert _ip.user_ns["b"] == ["{a}"]


# this is slow, put at the end for local testing.
def test_timeit_arguments():
    "Test valid timeit arguments, should not cause SyntaxError (GH #1269)"
    _ip.run_line_magic("timeit", "-n1 -r1 a=('#')")


def test_time_raise_on_interrupt():
    ip = get_ipython()

    with pytest.raises(KeyboardInterrupt):
        thread = Thread(target=_interrupt_after_1s)
        thread.start()
        ip.run_cell_magic("time", "", "from time import sleep; sleep(2)")
        thread.join()


MINIMAL_LAZY_MAGIC = """
from IPython.core.magic import (
    Magics,
    magics_class,
    line_magic,
    cell_magic,
)


@magics_class
class LazyMagics(Magics):
    @line_magic
    def lazy_line(self, line):
        print("Lazy Line")

    @cell_magic
    def lazy_cell(self, line, cell):
        print("Lazy Cell")


def load_ipython_extension(ipython):
    ipython.register_magics(LazyMagics)
"""


def test_lazy_magics():
    with pytest.raises(UsageError):
        ip.run_line_magic("lazy_line", "")

    startdir = os.getcwd()

    with TemporaryDirectory() as tmpdir:
        with prepended_to_syspath(tmpdir):
            ptempdir = Path(tmpdir)
            tf = ptempdir / "lazy_magic_module.py"
            tf.write_text(MINIMAL_LAZY_MAGIC)
            ip.magics_manager.register_lazy("lazy_line", Path(tf.name).name[:-3])
            with tt.AssertPrints("Lazy Line"):
                ip.run_line_magic("lazy_line", "")


TEST_MODULE = """
print('Loaded my_tmp')
if __name__ == "__main__":
    print('I just ran a script')
"""


def test_run_module_from_import_hook():
    "Test that a module can be loaded via an import hook"
    with TemporaryDirectory() as tmpdir:
        fullpath = os.path.join(tmpdir, "my_tmp.py")
        Path(fullpath).write_text(TEST_MODULE, encoding="utf-8")

        import importlib.abc
        import importlib.util

        class MyTempImporter(importlib.abc.MetaPathFinder, importlib.abc.SourceLoader):
            def find_spec(self, fullname, path, target=None):
                if fullname == "my_tmp":
                    return importlib.util.spec_from_loader(fullname, self)

            def get_filename(self, fullname):
                assert fullname == "my_tmp"
                return fullpath

            def get_data(self, path):
                assert Path(path).samefile(fullpath)
                return Path(fullpath).read_text(encoding="utf-8")

        sys.meta_path.insert(0, MyTempImporter())

        with capture_output() as captured:
            _ip.run_line_magic("run", "-m my_tmp")
            _ip.run_cell("import my_tmp")

        output = "Loaded my_tmp\nI just ran a script\nLoaded my_tmp\n"
        assert output == captured.stdout

        sys.meta_path.pop(0)
